Redis源码学习二:RedisCluster源码阅读(1)

RedisCluster使用P2P实现分布式,对于每个Redis节点来说,它既存储数据又要管理分布式状态,Redis节点之间使用Gossip协议进行通信,那么对于每个节点来说,它又是客户端又是服务端。所以了解节点通信的代码很重要。

1
所有代码均在cluster.h和cluster.c中。

1.Redis Cluster节点在初始化时,会创建AcceptTcpHandler(clusterAcceptHandler)。

可以看出RedisCluster节点内部使用port + 10000进行通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void clusterInit(void) {
/* Port sanity check II
* The other handshake port check is triggered too late to stop
* us from trying to use a too-high cluster port number. */
if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) {
redisLog(REDIS_WARNING, "Redis port number too high. "
"Cluster communication port is 10,000 port "
"numbers higher than your Redis port. "
"Your Redis port number must be "
"lower than 55535.");
exit(1);
}
if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == REDIS_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
redisPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
...忽略...
}

clusterInit会在redis.c中初始化redis实例时候使用

1
2
3
4
5
6
void initServer(void) {
...忽略...
if (server.cluster_enabled) clusterInit();
...忽略...
}

2. clusterAcceptHandler定义,进入clusterReadHandler逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
clusterLink *link;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
/* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */
if (server.masterhost == NULL && server.loading) return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
/* Use non-blocking I/O for cluster messages. */
redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
/* Create a link object we use to handle the connection.
* It gets passed to the readable handler when data is available.
* Initiallly the link->node pointer is set to NULL as we don't know
* which node is, but the right node is references once we know the
* node identity. */
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}

3. clusterReadHandler:解析请求,其中clusterProcessPacket是处理各种消息的逻辑(例如ping,pong,meet等)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/* Read data. Try to read the first field of the header first to check the
* full length of the packet. When a whole packet is in memory this function
* will call the function to process the packet. And so forth. */
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[sizeof(clusterMsg)];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = (clusterLink*) privdata;
unsigned int readlen, rcvbuflen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
while(1) { /* Read as long as there is data to read. */
rcvbuflen = sdslen(link->rcvbuf);
if (rcvbuflen < 8) {
/* First, obtain the first 8 bytes to get the full message
* length. */
readlen = 8 - rcvbuflen;
} else {
/* Finally read the full message. */
hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 8) {
/* Perform some sanity check on the message signature
* and length. */
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
{
redisLog(REDIS_WARNING,
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
return;
}
}
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}
nread = read(fd,buf,readlen);
if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
if (nread <= 0) {
/* I/O error... */
redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link);
return;
} else {
/* Read data and recast the pointer to the new buffer. */
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread;
}
/* Total length obtained? Process this packet. */
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}